Kinesis Data Streamsにデータが送信されている状態で、Lambdaをデプロイした際の動作を確認してみた
ふと気になりました。「Lambdaのデプロイ中にKinesis Data Streamsにデータを流し続けると、どういう動きになるんだろう?」と。
- ある時刻を境にバッチリ変わる?
- デプロイ付近で新しいLambdaと古いLambdaがランダムに実行される?
試してみました。
おすすめの方
- AWS SAMを使ってみたい方
- LambdaとDynamoDBを使ってみたい方
- Kinesis Data Streamsにデータが送信されている状態でLambdaをデプロイした場合の挙動に興味がある方
ざっくり概要
Kinesis Data Streamsとその後段にLambdaがあります。このLambdaでは受け取ったデータをDynamoDBに書き込みます。 Kinesis Data Stremasにデータを送信し続けている状態でLambdaをデプロイ(コード変更)したとき、どのような動作になるのかを試してみました。
まずはKinesis Data StreamsとLambdaを作成する
SAM Init
sam init \ --runtime python3.7 \ --name kinesis-lambda-deploy-sample \ --app-template hello-world
SAMテンプレートファイル
Kinesis Data Streamsのシャード数は外部パラメータで渡すようにしています。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: kinesis-lambda-deploy-sample Parameters: ShardCount: Type: Number Resources: TestStream: Type: AWS::Kinesis::Stream Properties: Name: !Sub todo-sample-${ShardCount}-stream ShardCount: !Sub ${ShardCount} TodoFunction: Type: AWS::Serverless::Function Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.7 Timeout: 10 AutoPublishAlias: Sample Environment: Variables: TABLE_NAME: !Ref TodoTable Policies: - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess Events: Stream: Type: Kinesis Properties: Stream: !GetAtt TestStream.Arn StartingPosition: LATEST BatchSize: 2 TodoFunctionLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/lambda/${TodoFunction} TodoTable: Type: AWS::DynamoDB::Table Properties: TableName: !Sub todo-sample-${ShardCount}-table AttributeDefinitions: - AttributeName: todoId AttributeType: S KeySchema: - AttributeName: todoId KeyType: HASH BillingMode: PAY_PER_REQUEST
Lambdaコード
DynamoDBに書き込むデータにタイムスタンプとlambda:aaa
を含めています。新しいLambdaではlambda:aaa
を変更することで、デプロイ前後どちらのLambdaが書き込んだのかを区別できるようにしています。
import base64 import json import os import boto3 from datetime import datetime, timezone dynamodb = boto3.resource('dynamodb') table_name = os.environ['TABLE_NAME'] def lambda_handler(event, context): table = dynamodb.Table(table_name) for record in event['Records']: b64_data = record['kinesis']['data'] data = base64.b64decode(b64_data) payload = json.loads(data) table.put_item(Item={ 'todoId': payload['todoId'], 'title': payload['title'], 'createdAt': int(datetime.now(timezone.utc).timestamp() * 1000), 'lambda': 'aaa' })
AWS SAMデプロイ
下記でデプロイします。ShardCount
は1
にしています。
sam build sam package \ --output-template-file packaged.yaml \ --s3-bucket cm-fujii.genki-sam-test-bucket sam deploy \ --template-file packaged.yaml \ --stack-name kinesis-lambda-deploy-sample-1-stack \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset \ --parameter-overrides ShardCount=1
Kinesis Data Streamsにデータを送信し続けるスクリプトを作成する
下記のPythonスクリプトを作成します。同ディレクトリにfinish.txt
が置かれるまで、APIアクセスを続けます。KINESIS_STREAM_NAME
には作成したKinesis Data Stremasの名前を記載します。
import sys import os import json import time import boto3 FINISH_FILE = 'finish.txt' KINESIS_STREAM_NAME = 'todo-sample-1-stream' client = boto3.client('kinesis') def put(index): count = 1 result_ok = 0 result_ng = 0 while True: payload = get_payload(index, count) try: client.put_record(**payload) except Exception: result_ng += 1 raise else: result_ok += 1 if is_finish(): break time.sleep(0.3) count += 1 print(f'ok: {result_ok}') print(f'ng: {result_ng}') def get_payload(index, count): return { 'StreamName': KINESIS_STREAM_NAME, 'PartitionKey': f'test-{index:02}', 'Data': json.dumps({ 'todoId': f't{index:02}-{count:04}', 'title': 'アレを買う' }) } def is_finish(): if os.path.isfile(FINISH_FILE): return True return False if __name__ == "__main__": args = sys.argv if len(args) == 2: put(int(args[1]))
Kinesis Data Stremasにデータを送信しながらLambdaをデプロイする(シャード数:1)
Lambdaコードを修正する
下記に変更します。DynamoDBに書き込むデータについて、一部をaaa
からxxxxxx
に変更しています。 これによって、デプロイ前後のどちらのLambdaから書き込んだのか区別できます。
import base64 import json import os import boto3 from datetime import datetime, timezone dynamodb = boto3.resource('dynamodb') table_name = os.environ['TABLE_NAME'] def lambda_handler(event, context): table = dynamodb.Table(table_name) for record in event['Records']: b64_data = record['kinesis']['data'] data = base64.b64decode(b64_data) payload = json.loads(data) table.put_item(Item={ 'todoId': payload['todoId'], 'title': payload['title'], 'createdAt': int(datetime.now(timezone.utc).timestamp() * 1000), 'lambda': 'xxxxxx' })
Kinesis Data Stremasにデータ送信開始する
手元のPCからKinesis Data Stremasに対して、データ送信を開始します。
python putter.py 1 & python putter.py 2 & python putter.py 3 &
デプロイする
下記コマンドでデプロイします。
sam build sam package \ --output-template-file packaged.yaml \ --s3-bucket cm-fujii.genki-sam-test-bucket sam deploy \ --template-file packaged.yaml \ --stack-name kinesis-lambda-deploy-sample-1-stack \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset \ --parameter-overrides ShardCount=1
デプロイ完了後、データ送信を停止する
touch finish.txt
結果
CloudWatch Logsの様子
ログストリームは3つありましたが、ログの時刻的にデプロイ前Lambdaの同時実行数は1つでした。
DynamoDBテーブルの様子
DynamoDBテーブルから全データを取得し、時刻順にソートしました(一部抜粋)。 デプロイ時刻付近で新しいLambdaと古いLambdaが混在実行されていることが分かります。
1604298637442: t03-0096, aaa 1604298637542: t02-0184, aaa 1604298637562: t01-0246, aaa 1604298637645: t03-0097, aaa 1604298639483: t02-0185, xxxxxx 1604298639712: t01-0247, xxxxxx 1604298640558: t03-0098, aaa 1604298640773: t02-0186, aaa 1604298640872: t01-0248, xxxxxx 1604298640925: t03-0099, xxxxxx 1604298641023: t02-0187, aaa 1604298641078: t01-0249, aaa 1604298641142: t03-0100, aaa 1604298641174: t02-0188, aaa 1604298641285: t01-0250, xxxxxx 1604298641343: t03-0101, xxxxxx 1604298641443: t02-0189, xxxxxx 1604298641483: t01-0251, xxxxxx 1604298641583: t03-0102, xxxxxx 1604298641633: t02-0190, xxxxxx 1604298641723: t01-0252, xxxxxx 1604298641763: t03-0103, xxxxxx 1604298641826: t02-0191, aaa 1604298641874: t01-0253, aaa 1604298641945: t03-0104, xxxxxx 1604298642003: t02-0192, xxxxxx 1604298642093: t01-0254, aaa 1604298642134: t03-0105, aaa 1604298642213: t02-0193, xxxxxx 1604298642263: t01-0255, xxxxxx 1604298642364: t03-0106, xxxxxx 1604298642403: t02-0194, xxxxxx 1604298642487: t01-0256, aaa 1604298642534: t03-0107, aaa 1604298642620: t02-0195, xxxxxx 1604298642683: t01-0257, xxxxxx 1604298642765: t03-0108, xxxxxx 1604298642823: t02-0196, xxxxxx 1604298642903: t01-0258, xxxxxx 1604298642962: t03-0109, xxxxxx 1604298643047: t02-0197, aaa 1604298643144: t01-0259, xxxxxx 1604298643183: t03-0110, xxxxxx 1604298643290: t02-0198, xxxxxx 1604298643345: t01-0260, xxxxxx 1604298643446: t03-0111, xxxxxx 1604298643503: t02-0199, xxxxxx
Kinesis Data Stremasにデータを送信しながらLambdaをデプロイする(シャード数:2)
シャード数を2
にして、Lambdaの同時実行数を増やして試しました。
デプロイ
下記でデプロイします。ShardCount
を2
にしています。
sam build sam package \ --output-template-file packaged.yaml \ --s3-bucket cm-fujii.genki-sam-test-bucket sam deploy \ --template-file packaged.yaml \ --stack-name kinesis-lambda-deploy-sample-2-stack \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset \ --parameter-overrides ShardCount=2
実験の手順はさきほどと同じのため省略します。
結果
CloudWatch Logsの様子
Lambdaの同時期同数は2になりました。
DynamoDBテーブルの様子
DynamoDBテーブルから全データを取得し、時刻順にソートしました。 こちらも、デプロイ時刻付近で新しいLambdaと古いLambdaが混在実行されていることが分かります。
1604300178013: t03-0128, aaa 1604300178022: t01-0235, aaa 1604300178733: t02-0201, aaa 1604300178774: t02-0202, aaa 1604300179379: t03-0129, yyyyyy 1604300179437: t02-0203, yyyyyy 1604300179603: t01-0236, yyyyyy 1604300179698: t03-0130, yyyyyy 1604300179707: t01-0237, yyyyyy 1604300180344: t02-0204, aaa 1604300180509: t03-0131, aaa 1604300180572: t02-0205, aaa 1604300180725: t02-0206, yyyyyy 1604300180768: t01-0238, aaa 1604300180832: t03-0132, aaa 1604300180863: t02-0207, aaa 1604300180911: t02-0208, aaa 1604300180923: t01-0239, aaa 1604300180970: t02-0209, aaa 1604300181004: t02-0210, aaa 1604300181011: t03-0133, yyyyyy 1604300181039: t01-0240, yyyyyy 1604300181104: t03-0134, aaa 1604300181144: t01-0241, aaa 1604300181753: t02-0211, yyyyyy 1604300181758: t03-0135, yyyyyy 1604300181799: t02-0212, yyyyyy 1604300181816: t01-0242, yyyyyy 1604300181884: t02-0213, aaa 1604300181916: t03-0136, yyyyyy 1604300181956: t01-0243, yyyyyy 1604300182058: t03-0137, aaa 1604300182104: t01-0244, aaa 1604300182182: t03-0138, yyyyyy 1604300182236: t01-0245, yyyyyy 1604300182316: t01-0246, yyyyyy 1604300182396: t03-0139, yyyyyy 1604300182457: t03-0140, aaa 1604300182504: t01-0247, aaa 1604300182584: t03-0141, yyyyyy 1604300182616: t01-0248, yyyyyy 1604300182766: t02-0214, yyyyyy 1604300182770: t03-0142, yyyyyy 1604300182799: t01-0249, yyyyyy
さいごに
「デプロイ付近で新しいLambdaと古いLambdaがランダムに実行される」という結果になりました。だからどうだというわけではないですが、なにかの際に役立つかもしれません。